Flink DataStream to Table с помощью SQL: присоединиться

Flink

Внутреннее соединение эквивалентно глобальному окну, и предыдущее сообщение было сохранено.Когда приходит сообщение, которое может быть связано, выводится декартово произведение нового сообщения!

package SQL;
 
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;
 
/**
 * @Author you guess
 * @Date 2021/1/6 15:22
 * @Version 1.0
 * @Desc
 */
public class DataStreamSql1_Join {
    private static final Logger LOG = LoggerFactory.getLogger(MinMinByMaxMaxBy.MinMinByMaxMaxByTest.class);
    private static final String[] TYPE = {"a苹果", "b梨", "c西瓜", "d葡萄", "e火龙果"};
 
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
        StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, envSettings);
 
        //添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
        DataStreamSource<Order> orderSourceA = env.addSource(new SourceFunction<Order>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();
 
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(1);
                    Order order = new Order(TYPE[random.nextInt(TYPE.length)], Long.valueOf(random.nextInt(10)));
                    System.out.println(new Date() + ",orderSourceA提交元素:" + order);
                    ctx.collect(order);
                }
            }
 
            @Override
            public void cancel() {
                isRunning = false;
            }
 
        }, "order-infoA");
 
 
        DataStreamSource<Order> orderSourceB = env.addSource(new SourceFunction<Order>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();
 
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(1);
                    Order order = new Order(TYPE[random.nextInt(TYPE.length)], Long.valueOf(random.nextInt(10)));
                    System.out.println(new Date() + ",orderSourceB提交元素:" + order);
                    ctx.collect(order);
                }
            }
 
            @Override
            public void cancel() {
                isRunning = false;
            }
 
        }, "order-infoB");
 
        stEnv.registerDataStream("tableA", orderSourceA);
        stEnv.registerDataStream("tableB", orderSourceB);
 
        Table result = stEnv.sqlQuery("SELECT A.name,A.qtty,B.qtty from tableA A inner join tableB  B on A.name = B.name");
        stEnv.toRetractStream(result, Row.class).print();//这里要用Row类型
        env.execute("Flink Streaming Java API Skeleton");
 
    }
 
 
    /**
     * Simple POJO.
     */
    public static class Order {
        public String name;
        public Long qtty;
 
 
        public Order() {
        }
 
        public Order(String name, Long qtty) {
            this.name = name;
            this.qtty = qtty;
        }
 
        @Override
        public String toString() {
            return "Order{" +
                    "name='" + name + '\'' +
                    ", qtty=" + qtty +
                    '}';
        }
    }
}
 

Ср, 06 января, 15:28:04 CST 2021, элемент отправки orderSourceA: Order{name='d виноград', qtty=9}

Ср, 06 января, 15:28:04 CST 2021, элемент отправки orderSourceB: Order{name='bear', qtty=8}

Ср, 06 января, 15:28:05 CST 2021, элемент отправки orderSourceA: Order{name='bear', qtty=4}

Ср, 06 января, 15:28:05 CST 2021, элемент отправки orderSourceB: Order{name='bear', qtty=6}

5> (правда,bpear,4,8)

5> (правда,bpear,4,6)

Ср, 06 января, 15:28:06 CST 2021, элемент отправки orderSourceA: Order{name='a apple', qtty=3}

Ср, 06 января, 15:28:06, CST 2021, элемент отправки orderSourceB: Order{name='e драконий фрукт', qtty=1}

Ср, 06 января, 15:28:07, CST 2021, элемент отправки orderSourceA: Order{name='e Dragon Fruit', qtty=4}

Ср, 06 января, 15:28:07 CST 2021, элемент отправки orderSourceB: Order{name='a apple', qtty=6}

9> (правда, яблоко, 3, 6)

4> (true,eDragonfruit,4,1)

Ср, 06 января, 15:28:08 CST 2021, элемент отправки orderSourceB: Order{name='e драконий фрукт', qtty=9}

Ср, 06 января, 15:28:08, CST 2021, элемент отправки orderSourceA: Order{name='e фрукт дракона', qtty=6}

4> (правда, eDragonfruit, 4,9)

4> (правда, eDragonfruit, 6,1)

4> (правда, eDragonfruit, 6,9)

Среда, 06 января, 15:28:09 CST 2021, элемент отправки orderSourceA: Order{name='e фрукт дракона', qtty=1}

Ср, 06 января, 15:28:09 CST 2021, элемент отправки orderSourceB: Order{name='e фрукт дракона', qtty=2}

4> (true,eDragonfruit,4,2)

4> (правда, eDragonfruit, 6,2)

4> (true,eDragonfruit,1,1)

4> (true,eDragonfruit,1,2)

4> (правда, eDragonfruit, 1,9)

Ср, 06 января, 15:28:10 CST 2021, элемент отправки orderSourceA: Order{name='c арбуз', qtty=5}

Ср, 06 января, 15:28:10 CST 2021, элемент отправки orderSourceB: Order{name='e драконий фрукт', qtty=6}

4> (true,eDragonfruit,4,6)

4> (правда, eDragonfruit, 6, 6)

4> (true,eDragonfruit,1,6)

Ср, 06 января, 15:28:11 CST 2021, элемент отправки orderSourceA: Order{name='c арбуз', qtty=6}

Ср, 06 января, 15:28:11 CST 2021, элемент отправки orderSourceB: Order{name='c арбуз', qtty=0}

8> (правда,cарбуз,5,0)

8> (правда,cарбуз,6,0)

Ср, 06 января, 15:28:12 CST 2021, элемент отправки orderSourceA: Order{name='bear', qtty=2}

Среда, 06 января, 15:28:12 CST 2021, элемент отправки orderSourceB: Order{name='d виноград', qtty=7}//Нет элемента отправки с orderSourceA до: Order{name='d виноград', qtty= 9} Совпало, так что вывода нет, а теперь можно сопоставить! !

3> (правда,d виноград,9,7)

5> (правда,bpear,2,8)

5> (правда,bpear,2,6)

Ср, 06 января, 15:28:13 CST 2021, элемент отправки orderSourceB: Order{name='d виноград', qtty=1}

Ср, 06 января, 15:28:13 CST 2021, элемент отправки orderSourceA: Order{name='d виноград', qtty=5}

3> (правда,d виноград,9,1)

3> (правда,d виноград,5,1)

3> (правда,d виноград,5,7)

Ср, 06 января, 15:28:14 CST 2021, элемент отправки orderSourceB: Order{name='d виноград', qtty=5}

Ср, 06 января, 15:28:14, CST 2021, элемент отправки orderSourceA: Order{name='e фрукт дракона', qtty=9}

3> (правда,d виноград,5,5)

3> (правда,d виноград,9,5)

4> (правда, eDragonfruit,9,6)

4> (правда, eDragonfruit, 9,1)

4> (правда, eDragonfruit, 9,2)

4> (правда, eDragonfruit,9,9)

Ср, 06 января, 15:28:15 CST 2021, orderSourceB submit element: Order{name='e dragon fruit', qtty=5} //Каждый раз, когда приходит новый элемент, если он может совпадать, он выводит декартово произведение

Ср, 06 января, 15:28:15 CST 2021, orderSourceA submit element: Order{name='e dragon fruit', qtty=4} //Каждый раз, когда приходит новый элемент, если он может совпадать, он выводит декартово произведение

4> (true,eDragonfruit,4,5)

4> (правда, eDragonfruit, 6,5)

4> (правда, eDragonfruit, 1,5)

4> (правда, eDragonfruit, 9,5)

4> (true,eDragonfruit,4,5)

4> (true,eDragonfruit,4,6)

4> (true,eDragonfruit,4,1)

4> (true,eDragonfruit,4,2)

4> (правда, eDragonfruit, 4,9)

Синтаксис разных версий flink сильно различается:

<flink.version>1.9.2</flink.version>

jdk1.8